-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decode old-style nested Xcom value #31866
Decode old-style nested Xcom value #31866
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test docker-compose quick start is failing, other than that it lgtm
airflow/serialization/serde.py
Outdated
@@ -276,7 +276,11 @@ def deserialize(o: T | None, full=True, type_hint: Any = None) -> object: | |||
def _convert(old: dict) -> dict: | |||
"""Converts an old style serialization to new style.""" | |||
if OLD_TYPE in old and OLD_DATA in old: | |||
return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA: old[OLD_DATA][OLD_DATA]} | |||
# Added to handle for handling xcom data from airflow 2.5.2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Added to handle for handling xcom data from airflow 2.5.2 | |
# Added to handle for handling xcom data from airflow 2.5.2 |
This comment feels a little weird to me. Is this really just 2.5.2, or <2.6?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same thought (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what all airflow version generates this encoded value, but I'll check and update that here. Sorry, should have done this in 1st iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code comment should be applicable for 2.4>= to <2.6 because the dataset concept was introduced in 2.4. Will update accordingly. Will update the code comment accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I was able to replicate the issue for airflow 2.4.3 as running example dag dataset_produces_1
produces data - [{“__type": "airflow.datasets.Dataset", "__source": "dataset_produces_1.producing_task_1", "__var": {"__var": {"uri": "s3://dag1/output_1.txt", "extra": {"__var": {"hi": "bye"}, "__type": "dict"}}, "__type": "dict"}}]
and the issue was not caught by testcase - https://github.com/apache/airflow/blob/main/tests/serialization/test_serde.py#L259-L273 as it doesn't have more than two levels of nested __var
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't find any encoder which add __var
in a nested object __var
, and why it was moved from
@staticmethod
def _convert(old: dict) -> dict:
"""Converts an old style serialization to new style"""
if OLD_TYPE in old and OLD_SOURCE in old:
return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA: old[OLD_DATA]}
to
def _convert(old: dict) -> dict:
"""Converts an old style serialization to new style"""
if OLD_TYPE in old and OLD_DATA in old:
return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA: old[OLD_DATA][OLD_DATA]}
return old
in #31866.
I wonder if it was a bug in that PR and we need just to remove it instead of adding a condition for 2.5.2.
BTW, this PR was released in 2.6.0, so we will have the same problem with all the previous version, maybe we need to test 2.5.3 and 2.4.3 before merging this one, in order to update the comments:
# Added to handle for handling xcom data from airflow 2.5.2
@bolkedebruin wdyt?
@hussein-awala In Airflow 2.4.3 if you run dag
And I think Also, in the old test case, we had only tests for the below data -
But if we explicitly handle the case for dict and do not assume the nested levels of
|
Looking |
Which is the nested form of the old style serialization. I think the solution is correct, but that the commit message and the inline comments could be improved. As mentioned this code was already handling pre 2.6 data, but it wasn't handling the nested version of that correctly. Also I think the test could use some improvement / addition. The above |
The deserializer was not properly dealing with nested and wrapped old-style xcom values. --------- Co-authored-by: bolkedebruin <[email protected]> (cherry picked from commit bd32467)
Xcom values generated by airflow 2.5.2 was not getting decoded by airflow 2.6.
There were two issues
airflow/airflow/serialization/serde.py
Line 273 in 58fca5e
__type == dict
in_convert()
we don't need to wrap the data in dict again.closes: #31769